{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "import elasticsearch\n",
    "import json\n",
    "import re\n",
    "\n",
    "es = elasticsearch.Elasticsearch()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In this example, we use the [Google geocoding API](https://developers.google.com/maps/documentation/geocoding/) to translate addresses into geo-coordinates. Google imposes usages limits on the API. If you are using this script to index data, you many need to sign up for an API key to overcome limits. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from geopy.geocoders import GoogleV3\n",
    "geolocator = GoogleV3()\n",
    "# geolocator = GoogleV3(api_key=<your_google_api_key>)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Import Data\n",
    "Import restaurant inspection data into a Pandas dataframe "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "t = pd.read_csv('https://data.cityofnewyork.us/api/views/43nn-pn8j/rows.csv?accessType=DOWNLOAD', header=0, sep=',', dtype={'PHONE':str, 'INSPECTION DATE':str});"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "## Helper Functions\n",
    "from datetime import datetime\n",
    "def str_to_iso(text):\n",
    "    if text != '':\n",
    "        for fmt in (['%m/%d/%Y']):\n",
    "            try:\n",
    "                #print(fmt)\n",
    "                #print(datetime.strptime(text, fmt))\n",
    "                return datetime.isoformat(datetime.strptime(text, fmt))\n",
    "                \n",
    "            except ValueError:\n",
    "                #print(text)\n",
    "                pass        \n",
    "                #raise ValueError('Changing date')\n",
    "    else:\n",
    "        \n",
    "        return None\n",
    "    \n",
    "def getLatLon(row):\n",
    "    if row['Address'] != '':\n",
    "        location = geolocator.geocode(row['Address'], timeout=10000, sensor=False)\n",
    "        if location != None:\n",
    "            lat = location.latitude\n",
    "            lon = location.longitude\n",
    "            #print(lat,lon)\n",
    "            return [lon, lat]\n",
    "  \n",
    "    elif row['Zipcode'] !='' or location != None:    \n",
    "        location = geolocator.geocode(row['Zipcode'], timeout=10000, sensor=False)\n",
    "    \n",
    "        if location != None:\n",
    "            lat = location.latitude\n",
    "            lon = location.longitude\n",
    "            #print(lat,lon)\n",
    "            return [lon, lat]\n",
    "    else:\n",
    "        return None\n",
    "    \n",
    "def getAddress(row):\n",
    "    if row['Building'] != '' and row['Street'] != '' and row['Boro'] != '':\n",
    "        x = row['Building']+' '+row['Street']+' '+row['Boro']+',NY'\n",
    "        x = re.sub(' +',' ',x)\n",
    "        return x\n",
    "    else:\n",
    "        return ''\n",
    "    \n",
    "\n",
    "def combineCT(x):\n",
    "    \n",
    "    return str(x['Inspection_Date'][0][0:10])+'_'+str(x['Camis'])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Data preprocessing "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# process column names: remove spaces & use title casing \n",
    "t.columns = map(str.title, t.columns)  \n",
    "t.columns = map(lambda x: x.replace(' ', '_'), t.columns) \n",
    "\n",
    "# replace nan with ''\n",
    "t.fillna('', inplace=True)\n",
    "\n",
    "# Convert date to ISO format\n",
    "t['Inspection_Date'] = t['Inspection_Date'].map(lambda x: str_to_iso(x))\n",
    "t['Record_Date'] = t['Record_Date'].map(lambda x: str_to_iso(x))\n",
    "t['Grade_Date'] = t['Grade_Date'].map(lambda x: str_to_iso(x))\n",
    "#t['Inspection_Date'] = t['Inspection_Date'].map(lambda x: x.split('/'))\n",
    "\n",
    "# Combine Street, Building and Boro information to create Address string\n",
    "t['Address'] = t.apply(getAddress, axis=1)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create a dictionary of unique Addresses. We do this to avoid calling the Google geocoding api multiple times for the same address"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "addDict = t[['Address','Zipcode']].copy(deep=True)\n",
    "addDict = addDict.drop_duplicates()\n",
    "addDict['Coord'] = [None]* len(addDict)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Get address for the geolocation for each address. This step can take a while because it's calling the Google geocoding API for each unique address. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "for item_id, item in addDict.iterrows():\n",
    "    if item_id % 100 == 0:\n",
    "        print(item_id)\n",
    "    if addDict['Coord'][item_id] == None:\n",
    "        addDict['Coord'][item_id] = getLatLon(item)\n",
    "    #print(addDict.loc[item_id]['Coord'])\n",
    "    \n",
    "# Save address dictionary to CSV\n",
    "#addDict.to_csv('./dict_final.csv')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# Merge coordinates into original table\n",
    "t1 = t.merge(addDict[['Address', 'Coord']])\n",
    "\n",
    "# Keep only 1 value of score and grade per inspection \n",
    "t2=t1.copy(deep=True)\n",
    "t2['raw_num'] = t2.index\n",
    "t2['RI'] = t2.apply(combineCT, axis=1)\n",
    "yy = t2.groupby('RI').first().reset_index()['raw_num']\n",
    "\n",
    "t2['Unique_Score'] = None\n",
    "t2['Unique_Score'].loc[yy.values] = t2['Score'].loc[yy.values]\n",
    "t2['Unique_Grade'] = None\n",
    "t2['Unique_Grade'].loc[yy.values] = t2['Grade'].loc[yy.values]\n",
    "\n",
    "del(t2['RI'])\n",
    "del(t2['raw_num'])\n",
    "del(t2['Grade'])\n",
    "del(t2['Score'])\n",
    "\n",
    "t2.rename(columns={'Unique_Grade' : 'Grade','Unique_Score':'Score'}, inplace=True)\n",
    "t2['Grade'].fillna('', inplace=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "t2.iloc[1]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Index Data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "### Create and configure Elasticsearch index\n",
    "\n",
    "# Name of index and document type\n",
    "index_name = 'nyc_restaurants';\n",
    "doc_name = 'inspection'\n",
    "\n",
    "# Delete donorschoose index if one does exist\n",
    "if es.indices.exists(index_name):\n",
    "    es.indices.delete(index_name)\n",
    "\n",
    "# Create donorschoose index\n",
    "es.indices.create(index_name)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# Add mapping\n",
    "with open('./inspection_mapping.json') as json_mapping:\n",
    "    d = json.load(json_mapping)\n",
    "\n",
    "es.indices.put_mapping(index=index_name, doc_type=doc_name, body=d)\n",
    "\n",
    "# Index data\n",
    "for item_id, item in t2.iterrows():\n",
    "    if item_id % 1000 == 0:\n",
    "        print(item_id)\n",
    "    thisItem = item.to_dict()\n",
    "    #thisItem['Coord'] = getLatLon(thisItem)\n",
    "    thisDoc = json.dumps(thisItem);\n",
    "    #pprint.pprint(thisItem)\n",
    "\n",
    "    # write to elasticsearch\n",
    "    es.index(index=index_name, doc_type=doc_name, id=item_id, body=thisDoc)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    ""
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3.0
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.4.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}